[Tips] Firehoseで吐いたJSONが繋がったファイルをPythonでパースしてみる
CX事業本部の夏目です。
IoTのバックエンドをサーバーレスで開発しているのですが、デバイスからMQTTで上がってくるデータをIoT Rule -> Kinesis Data Strem -> Kinesis Firehose -> S3 Bucket
といった形で吐き出させています。
しかしこのデータ、JSONの形式とは一致しないため、パースするには工夫が必要です。
今回はPythonでパースする方法を共有します。
データ
{"payload":{"id":"4d497f23-41c8-4488-8a45-099875a83106"},"timestamp":1581080246231}{"payload":{"id":"546c0712-2c93-424c-8350-f7ab3aaa3117"},"timestamp":1581080246785}{"payload":{"id":"9c385ea9-e507-4ae1-9e16-820df50c9786"},"timestamp":1581080247343}{"payload":{"id":"41edb0ab-24e4-445f-a84b-912e4bd38a61"},"timestamp":1581080247896}{"payload":{"id":"1c56d473-75e0-4518-bb2c-eb7d08c15c0c"},"timestamp":1581080248499}{"payload":{"id":"a51bf3e6-cb26-465c-b608-d2c0688289b3"},"timestamp":1581080249171}{"payload":{"id":"3021c568-8695-49c0-bae2-493145427ace"},"timestamp":1581080249751}{"payload":{"id":"073e05a2-d803-480a-b0ff-ce8dd25c1500"},"timestamp":1581080250308}{"payload":{"id":"71ac5a8c-a7b8-41a9-a756-764023c0e2e3"},"timestamp":1581080250911}{"payload":{"id":"16fab82e-66d1-4591-93cd-edfcf9477347"},"timestamp":1581080251466}{"payload":{"id":"cf865487-7847-428a-b593-51f193e99259"},"timestamp":1581080252017}{"payload":{"id":"f8d7dfdf-7a1a-4101-96d4-5dc3ead1bd76"},"timestamp":1581080252586}{"payload":{"id":"47928637-6c26-4cef-8c95-10ec1fe0d57f"},"timestamp":1581080253160}
複数のJSONドキュメントをただ連結しただけのものです。
MQTTでJSONデータを受け取ってFirehoseでS3に保存するとこうなります。
Pythonで読み込む
from json.decoder import WHITESPACE, JSONDecoder from typing import Generator def load_iter(text: str) -> Generator: size = len(text) decoder = JSONDecoder() index = 0 while index < size: obj, offset = decoder.raw_decode(text, index) yield obj search = WHITESPACE.search(text, offset) if search is None: break index = search.end() if __name__ == '__main__': text = open('path').read() for obj in load_iter(text): print('===') print(obj)
load_iter
という関数でデータのパースを行っています。
PythonのGeneratorを使用しているので、そのままループで処理させることができます。
ちなみに実行結果はこうなります。
=== {'payload': {'id': '4d497f23-41c8-4488-8a45-099875a83106'}, 'timestamp': 1581080246231} === {'payload': {'id': '546c0712-2c93-424c-8350-f7ab3aaa3117'}, 'timestamp': 1581080246785} === {'payload': {'id': '9c385ea9-e507-4ae1-9e16-820df50c9786'}, 'timestamp': 1581080247343} === {'payload': {'id': '41edb0ab-24e4-445f-a84b-912e4bd38a61'}, 'timestamp': 1581080247896} === {'payload': {'id': '1c56d473-75e0-4518-bb2c-eb7d08c15c0c'}, 'timestamp': 1581080248499} === {'payload': {'id': 'a51bf3e6-cb26-465c-b608-d2c0688289b3'}, 'timestamp': 1581080249171} === {'payload': {'id': '3021c568-8695-49c0-bae2-493145427ace'}, 'timestamp': 1581080249751} === {'payload': {'id': '073e05a2-d803-480a-b0ff-ce8dd25c1500'}, 'timestamp': 1581080250308} === {'payload': {'id': '71ac5a8c-a7b8-41a9-a756-764023c0e2e3'}, 'timestamp': 1581080250911} === {'payload': {'id': '16fab82e-66d1-4591-93cd-edfcf9477347'}, 'timestamp': 1581080251466} === {'payload': {'id': 'cf865487-7847-428a-b593-51f193e99259'}, 'timestamp': 1581080252017} === {'payload': {'id': 'f8d7dfdf-7a1a-4101-96d4-5dc3ead1bd76'}, 'timestamp': 1581080252586} === {'payload': {'id': '47928637-6c26-4cef-8c95-10ec1fe0d57f'}, 'timestamp': 1581080253160}
まとめ
いかがでしょうか。 Firehoseで吐いたデータを簡単にパースできました。
機会があったら使ってみてください。